fix: persist side-effect columns to row buffer in async engine#524
fix: persist side-effect columns to row buffer in async engine#524
Conversation
`_run_cell` and `_run_batch` only wrote columns tracked in `_instance_to_columns` back to the buffer, silently dropping side-effect columns like `__trace` and `__reasoning_content`. Downstream columns referencing these values would fail with missing column errors. After writing tracked output columns, now also persist any new keys from the generator result that weren't in the input row data. Made-with: Cursor
Code Review: PR #524 — fix: persist side-effect columns to row buffer in async engineSummaryThis PR fixes a bug where the async engine's The fix adds a second write pass in both Files changed: 2 (86 additions, 1 deletion)
CI status: Lint passes. Most test/E2E jobs still pending at review time. Config tests all pass. Findings1.
|
Greptile SummaryThis PR fixes a silent data-loss bug in the async scheduler where side-effect columns produced by generators (e.g. The fix introduces a separate
|
| Filename | Overview |
|---|---|
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py | Adds side_effect_map parameter and _instance_to_write_columns dict; all three write-back paths correctly switched from _instance_to_columns to _instance_to_write_columns. |
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py | Correctly builds side_effect_map from gen.config.side_effect_columns / sub.side_effect_columns and passes it to AsyncTaskScheduler; handles both MultiColumnConfig and single-column cases. |
| packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py | New test verifies that side-effect columns are written to the buffer after a cell-by-cell generator run; covers the core bug scenario with answer__reasoning_content. |
Sequence Diagram
sequenceDiagram
participant DB as DatasetBuilder
participant AS as AsyncTaskScheduler
participant Gen as ColumnGenerator
participant BM as RowGroupBufferManager
DB->>DB: build side_effect_map
DB->>AS: AsyncTaskScheduler(..., side_effect_map)
AS->>AS: build _instance_to_write_columns
loop Cell-by-cell task for answer
AS->>BM: get_row(rg, ri)
AS->>Gen: agenerate(row_data)
Gen-->>AS: result with answer + answer__reasoning_content
AS->>BM: update_cell answer
AS->>BM: update_cell answer__reasoning_content
end
loop Cell-by-cell task for judge
AS->>BM: get_row includes answer__reasoning_content
AS->>Gen: agenerate(row_data)
end
Reviews (2): Last reviewed commit: "try a better fix" | Re-trigger Greptile
| strategies[gen.config.name] = gen.get_generation_strategy() | ||
| gen_map[gen.config.name] = gen | ||
| for se_col in gen.config.side_effect_columns: | ||
| side_effect_map[se_col] = gen.config.name |
There was a problem hiding this comment.
ExecutionGraph.create() on line 342 already builds the same {side_effect_col: primary_col} mapping internally (_side_effect_map). might be worth exposing that as a read-only property on the graph and reading it here instead of building it independently - keeps the graph as the single source of truth and avoids the two copies drifting apart if side-effect logic gets more complex later.
| row_groups=row_groups, | ||
| buffer_manager=buffer_manager, | ||
| side_effect_map={side_effect_col: "answer"}, | ||
| ) |
There was a problem hiding this comment.
nit: this test exercises the scheduler directly, which proves the mechanism works, but doesn't go through _prepare_async_run() where the production wiring happens. if someone refactors the builder and forgets to pass side_effect_map, this test would still pass. maybe worth an integration-level test through build_preview() too?
| "seed": MockSeedGenerator(config=_expr_config("seed"), resource_provider=provider), | ||
| "answer": answer_gen, | ||
| "judge": MockCellGenerator(config=_expr_config("judge"), resource_provider=provider), | ||
| } |
There was a problem hiding this comment.
nit: the judge mock doesn't actually consume answer__reasoning_content from its input data, so this doesn't fully prove that downstream columns can read the side-effect value from the buffer. maybe worth swapping in a mock that asserts data[side_effect_col] is present? current test still catches the original bug though, so no blocker.
| tracker: CompletionTracker, | ||
| row_groups: list[tuple[int, int]], | ||
| buffer_manager: RowGroupBufferManager | None = None, | ||
| side_effect_map: dict[str, str] | None = None, |
There was a problem hiding this comment.
minor: side_effect_map is positional here but all the other optional config params are keyword-only (after *). might want to move it after * for consistency?
|
Hey @nabinchha - heads up that #509 addresses the same root cause and also fixes a related bug in Want to sync up so we're not duplicating effort? Happy to merge the approaches - the |
|
@andreatgretel you are right! This is a duplicate of #509. Will close. |
📋 Summary
The async engine's
_run_cell,_run_batch, and_run_from_scratchonly wrote columns tracked in_instance_to_columnsback to theRowGroupBufferManager, silently dropping side-effect columns produced by generators (e.g.__trace,__reasoning_content). Downstream columns referencing these values would fail with missing column errors.This fix introduces an explicit
_instance_to_write_columnsmap that extends_instance_to_columnswith side-effect columns declared by each config. The original_instance_to_columnsremains unchanged for completion tracking and dispatch dedup, while_instance_to_write_columnsis used at all three buffer write-back sites.🔗 Related Issue
Fixes #523
🔄 Changes
🔧 Changed
dataset_builder.py: In_prepare_async_run, collect aside_effect_map(side-effect column → primary column) from each generator's config and pass it toAsyncTaskSchedulerasync_scheduler.py: Acceptside_effect_mapin__init__, build_instance_to_write_columnsby extending_instance_to_columnswith side-effect entries. Use_instance_to_write_columnsat the three buffer write sites (_run_from_scratch,_run_cell,_run_batch)🧪 Tests
test_async_scheduler.py: AddedMockCellGeneratorWithSideEffectandtest_scheduler_side_effect_columns_written_to_buffer— verifies side-effect columns are persisted to the buffer and available to downstream generators. Config usesextract_reasoning_content=Trueso theExecutionGraphproperly registers the side-effect alias.🔍 Attention Areas
async_scheduler.py—_instance_to_write_columnsconstruction: New map that extends_instance_to_columnswith side-effect columns._instance_to_columnsis intentionally left unchanged forCompletionTrackerand dispatch dedup.🧪 Testing
make testpasses✅ Checklist
Made with Cursor